#!/usr/bin/env python
"""An example of how one can partition a large file and process it in parallel.
The core function is `partition`.  Also look at how multiprocessing's
starmap_async is used to get the result.

"""
# Copyright 2018 - Stefano Mazzucco <stefano - AT - curso - DOT - re
# License: Apache 2.0
# https://www.apache.org/licenses/LICENSE-2.0.txt

import mmap
import os


def partition(size, parts, overlap):
    """Return the offsets of the file given its size in bytes, the number of parts
    it should be split into and the overlap between the parts in bytes.
    """
    length = size // parts

    def partition(part):
        offset = part * (length - overlap + 1)
        return offset - (offset % mmap.ALLOCATIONGRANULARITY)

    return [partition(part) for part in range(parts)]


def process(fd, length, offset, access):
    """Process file descriptor `fd` for its `length`, starting at `offset` and
    using `access` permissions.
    """
    # mmap cannot be pickled, so we create it in the sub-processes.
    m = mmap.mmap(
        fd,
        length=length,
        offset=offset,
        access=access
    )
    # Note that we don't have any guarantee that the first line is complete,
    # excluding the first chunk of the file.
    if offset == 0:
        header = m.readline()   # Say this is a CSV file with headers.
        print('HEADER', header)
    line = m.readline()         # Just an example.
    # print('Processed', line)
    return line


def parse_args(argv=None):
    from argparse import ArgumentParser

    parser = ArgumentParser(
        description='Process file in parallel.'
    )

    parser.add_argument(
        'fname',
        help='File to be processed',
    )

    parser.add_argument(
        '--parts',
        help='How many parts to split the processing into. Defaults to 3.',
        default=3,
        metavar='N',
        type=int,
    )

    return parser.parse_args(argv)


if __name__ == '__main__':

    from multiprocessing import Pool, TimeoutError

    arguments = parse_args()
    input_file = arguments.fname
    parts = arguments.parts

    with open(input_file, 'rb') as f:

        # Twice the length of the first line should be a fair overlap.
        overlap = 2 * len(f.readline())

        fd = f.fileno()
        size = os.fstat(fd).st_size

        offsets = partition(size, parts, overlap)
        length = size // parts

        last = parts - 1
        args = [
            (
                fd,
                # Last part maps to end of file.
                0 if part == last else length,
                offset,
                mmap.ACCESS_READ
            )
            for part, offset in enumerate(offsets)
        ]

        workers = len(os.sched_getaffinity(0)) + 1

        def callback(result):
            print('CALLBACK', result)

        def error_callback(error):
            print('ERROR CALLBACK', error)

        with Pool(workers) as pool:
            result = pool.starmap_async(
                process,
                args,
                callback=callback,
                error_callback=error_callback
            )

            timeout = 2
            try:
                outcome = result.get(timeout=timeout)
                print('OUTCOME', outcome)
            except TimeoutError:
                print(f'Timed out after {timeout} seconds')
